Source code for hysop.fields.field_requirements

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import numpy as np
import itertools as it

from hysop import __DEBUG__, main_size
from hysop.constants import MemoryOrdering
from hysop.tools.transposition_states import TranspositionState
from hysop.tools.htypes import to_list, to_tuple, check_instance, first_not_None
from hysop.tools.numpywrappers import npw
from hysop.tools.decorators import debug
from hysop.topology.topology import Topology
from hysop.topology.topology_descriptor import TopologyDescriptor
from hysop.core.graph.computational_node import ComputationalGraphNode
from hysop.fields.continuous_field import ScalarField
from hysop.fields.discrete_field import DiscreteScalarField

# Debug level for topology creation
#   0: no debug logs
#   1: topo creation summary for each field
#   2: topo creation details for all discrete field requirements
TOPO_CREATION_DEBUG_LEVEL = 0


[docs] def gprint(*args, **kwds): level = kwds.pop("level", 2) if TOPO_CREATION_DEBUG_LEVEL >= level: print(*args, **kwds)
[docs] class DiscreteFieldRequirements: __slots__ = ( "_operator", "_field", "_variables", "_topology", "_dim", "_min_ghosts", "_max_ghosts", "_can_split", "_axes", "_memory_order", "_registered", "_work_dim", "_topology_descriptor", "_header", ) _registered_requirements = set() def __init__( self, operator, variables, field, min_ghosts=None, max_ghosts=None, can_split=None, memory_order=None, axes=None, _register=True, **kwds, ): if _register: key = (id(operator), id(variables), id(field)) if key in self._registered_requirements: msg = "Operator {} has already registered requirements for field {} " msg += "to variables id {}." msg = msg.format(operator.name, field.name, id(variables)) raise RuntimeError(msg) else: if __DEBUG__: msg = "Operator {} registered requirements of field {} to variables id {}." msg = msg.format(operator.name, field.name, id(variables)) print(msg) self._registered_requirements.update(key) super().__init__(**kwds) check_instance(field, ScalarField) check_instance(operator, ComputationalGraphNode, allow_none=(not _register)) check_instance( variables, dict, keys=ScalarField, values=(Topology, TopologyDescriptor), allow_none=(not _register), ) self._operator = operator self._field = field self._work_dim = field.dim self._variables = variables self._topology_descriptor = variables[field] if variables else None self._dim = field.dim self._header = "::{}[{}] requirements::\n".format( getattr(operator, "name", "UnknownOperator"), getattr(field, "name", "UnknownField"), ) self._registered = _register self.min_ghosts = min_ghosts self.max_ghosts = max_ghosts self.can_split = can_split self.memory_order = memory_order self.axes = axes
[docs] def as_dfr(self): return self
[docs] def copy(self): return DiscreteFieldRequirements( operator=self._operator, variables=self._variables, field=self._field, min_ghosts=self._min_ghosts, max_ghosts=self._max_ghosts, can_split=self._can_split, memory_order=self._memory_order, axes=self._axes, )
[docs] def is_default(self): return self == self._default()
def _default(self): return DiscreteFieldRequirements( self._operator, self._variables, self._field, _register=False ) def __eq__(self, other): eq = self.operator is other.operator eq &= self.variables is other.variables eq &= self.field is other.field eq &= (self.min_ghosts == other.min_ghosts).all() eq &= (self.max_ghosts == other.max_ghosts).all() eq &= (self.can_split == other.can_split).all() eq &= self.memory_order == other.memory_order eq &= self.tstates == other.tstates return eq def __hash__(self): return ( id(self.operator) ^ id(self.variables) ^ id(self.field) ^ hash( ( to_tuple(self.min_ghosts), to_tuple(self.max_ghosts), self.memory_order, self.tstates, ) ) )
[docs] def ghost_str(self, array): inf = "+∞" vals = ["" + str(x) if np.isfinite(x) else inf for x in array] return "[{}]".format(",".join(vals)).strip()
def __str__(self): return "{:15s} {:>10s}<=ghosts<{:<10s} memory_order={} can_split={} tstates={}".format( "{}::{}".format( getattr(self.operator, "name", "UnknownOperator"), getattr(self.field, "name", "UnknownField"), ), self.ghost_str(self.min_ghosts), self.ghost_str(self.max_ghosts + 1), self.memory_order, "" + str(self.can_split.view(np.int8)), ( "[{}]".format(",".join("" + str(ts) for ts in self.tstates)) if self.tstates else "ANY" ), )
[docs] def get_axes(self): return self._axes
[docs] def set_axes(self, axes): check_instance(axes, tuple, values=tuple, allow_none=True) if axes: if not all([len(_) == self._dim for _ in axes]): msg = f"all given axis should be of length {self._dim}, given {axes}" assert False, msg self._axes = axes
[docs] def get_tstates(self): all_axes = self._axes if all_axes is None: return None else: return tuple(TranspositionState[self._dim](axes) for axes in all_axes)
[docs] def get_memory_order(self): return self._memory_order
[docs] def set_memory_order(self, memory_order): check_instance(memory_order, MemoryOrdering, allow_none=True) if memory_order is None: memory_order = MemoryOrdering.ANY assert memory_order in ( MemoryOrdering.C_CONTIGUOUS, MemoryOrdering.F_CONTIGUOUS, MemoryOrdering.ANY, ), memory_order self._memory_order = memory_order
[docs] def get_min_ghosts(self): return self._min_ghosts
[docs] def set_min_ghosts(self, min_ghosts): self._min_ghosts = np.asarray( to_list(min_ghosts) if (min_ghosts is not None) else [0] * self.workdim ) assert self.min_ghosts.size == self.workdim
[docs] def get_max_ghosts(self): return self._max_ghosts
[docs] def set_max_ghosts(self, max_ghosts): self._max_ghosts = np.asarray( to_list(max_ghosts) if (max_ghosts is not None) else [np.inf] * self.workdim ) assert self.max_ghosts.size == self.workdim
[docs] def get_can_split(self): return self._can_split
[docs] def set_can_split(self, can_split): self._can_split = np.asarray( to_list(can_split) if (can_split is not None) else [1] * self.workdim, dtype=np.bool_, ) assert self.can_split.size == self.workdim
[docs] def get_work_dim(self): return self._work_dim
[docs] def get_operator(self): return self._operator
[docs] def get_field(self): return self._field
[docs] def get_variables(self): return self._variables
[docs] def get_topology_descriptor(self): return self._topology_descriptor
can_split = property(get_can_split, set_can_split) min_ghosts = property(get_min_ghosts, set_min_ghosts) max_ghosts = property(get_max_ghosts, set_max_ghosts) axes = property(get_axes, set_axes) tstates = property(get_tstates) memory_order = property(get_memory_order, set_memory_order) workdim = property(get_work_dim) operator = property(get_operator) field = property(get_field) variables = property(get_variables) topology_descriptor = property(get_topology_descriptor)
[docs] def is_compatible_with(self, other, i=None): assert self.field == other.field, "field mismatch." if isinstance(other, DiscreteFieldRequirements): others = {other} elif isinstance(other, MultiFieldRequirements): if self.topology_descriptor in other.requirements.keys(): others = other.requirements[self.topology_descriptor] else: return True else: msg = f"Unknown type {other.__class__}." raise TypeError(msg) for other in others: assert self.workdim == other.workdim, "workdim mismatch." assert ( self.topology_descriptor == other.topology_descriptor ), "topology_descriptor mismatch." if (self.field.lboundaries != other.field.lboundaries).any(): if i is not None: gprint(f" => lboundaries mismatch with subgroup {i}") return False if (self.field.rboundaries != other.field.rboundaries).any(): if i is not None: gprint(f" => rboundaries mismatch with subgroup {i}") return False if (other.max_ghosts < self.min_ghosts).any(): if i is not None: gprint(f" => ghosts incompatibility with subgroup {i}") return False if (other.min_ghosts > self.max_ghosts).any(): if i is not None: gprint(f" => ghosts incompatibility with subgroup {i}") return False multiprocess = main_size > 1 if multiprocess and not (other.can_split * self.can_split).any(): if i is not None: gprint(f" => splitting incompatibility with subgroup {i}") return False if i is not None: gprint(f" => compatible with subgroup {i}") return True
[docs] def update_requirements(self, other): assert self.is_compatible_with(other) assert self.memory_order == other.memory_order assert (self.tstates is None) or self.tstates.intersection(other.tstates) self.min_ghosts = np.maximum(self.min_ghosts, other.min_ghosts) self.max_ghosts = np.minimum(self.max_ghosts, other.max_ghosts) self.can_split *= other.can_split if self.axes: self.axes = self.axes.intersection(other.axes) if other.axes else self.axes else: self.axes = other.axes
[docs] def check_topology(self, topology=None): topology = topology or self.variables[self.field] check_instance(topology, Topology) if topology.domain.dim != self.field.dim: msg = "{} Dimension mismatch between field and topology.\n field={}d, topology={}d." msg = msg.format(self._header, self.field.dim, topology.domain.dim) raise RuntimeError(msg) if (topology.grid_resolution != self.topology_descriptor.grid_resolution).any(): msg = "{} Grid resolution mismatch between requirement and topology.\n " msg += " requirement={}\n topology={}" msg = msg.format( self._header, self.topology_descriptor.grid_resolution, topology.grid_resolution, ) raise RuntimeError(msg) if ( topology.global_resolution != self.topology_descriptor.global_resolution ).any(): msg = "{} Global resolution mismatch between requirement and topology.\n " msg += " requirement={}\n topology={}" msg = msg.format( self._header, self.topology_descriptor.global_resolution, topology.global_resolution, ) raise RuntimeError(msg) if (topology.ghosts < self.min_ghosts).any(): msg = "{} min ghosts constraint was not met.\n min={}, actual={}." msg = msg.format(self._header, self.min_ghosts, topology.ghosts) raise RuntimeError(msg) if (topology.ghosts > self.max_ghosts).any(): msg = "{} max ghosts constraint was not met.\n max={}, actual={}." msg = msg.format(self._header, self.max_ghosts, topology.ghosts) raise RuntimeError(msg)
[docs] def check_discrete_topology_state(self, state): from hysop.topology.cartesian_topology import CartesianTopologyState check_instance(state, CartesianTopologyState) if ( (self.memory_order is not None) and (self.memory_order is not MemoryOrdering.ANY) and (self.memory_order != state.memory_order) ): msg = "{} memory_order mismatch between requirement and topology state.\n reqs={}, state={}." msg = msg.format(self._header, self.memory_order, state.memory_order) raise RuntimeError(msg) if (self.tstates is not None) and (state.tstate not in self.tstates): msg = "{} Transposition state mismatch between requirement and topology state.\n" msg += " reqs=[{}], state={}." msg = msg.format( self._header, ",".join([str(x) for x in self.tstates]), state.tstate ) raise RuntimeError(msg)
[docs] def check_state(self, dfield): check_instance(dfield, DiscreteScalarField) self.check_topology(dfield.topology) self.check_discrete_topology_state(dfield.state)
[docs] def set_and_check_topology(self, topology): """ Check topology and replace a TopologyDescriptor by a Topology instance in self.variables[self.field]. """ assert isinstance(topology, Topology) assert not isinstance(self.variables[self.field], Topology) or ( self.variables[self.field] == topology ) self.check_topology(topology) self.variables[self.field] = topology
[docs] class MultiFieldRequirements: __slots__ = ("field", "requirements", "built", "common_can_split") def __init__(self, field): self.field = field self.requirements = {} self.built = False self.common_can_split = None
[docs] def copy(self): requirements = {k: v.copy() for (k, v) in self.requirements.items()} obj = MultiFieldRequirements(field=self.field) obj.built = self.built obj.requirements = requirements return obj
[docs] def as_dfr(self): # return a DiscreteFieldRequirements if there is only one requirement if self.nrequirements() == 0: return None else: assert self.nrequirements() == 1 return next(iter(tuple(self.requirements.values())[0]))
[docs] def nrequirements(self): return sum(len(lreqs) for lreqs in self.requirements.values())
[docs] def update(self, *update_reqs): for update_req in update_reqs: if update_req is None: continue if isinstance(update_req, MultiFieldRequirements): tds = update_req.requirements.keys() reqs = update_req.requirements.values() else: tds = [update_req.topology_descriptor] reqs = [[update_req]] for td, req in zip(tds, reqs): self.requirements.setdefault(td, set()).update(req)
[docs] def build_topologies(self): gprint( f"\nMULTIFIELD_REQUIREMENTS.BUILD_TOPOLOGIES() for field {self.field.name}" ) if self.built: return gprint(" 1) SPLITTING REQUIREMENTS IN COMPATIBLE SUBGROUPS:") multi_process = tuple(self.requirements.keys())[0].mpi_params.size > 1 splitted_reqs = self._split(multi_process) gprint( " 2) DETERMINING COMMON CARTESIAN TOPOLOGY SPLITTING AXES (if possible):" ) can_split = 1 for i, compatible_reqs in enumerate(splitted_reqs): subgroup_can_split = compatible_reqs.common_can_split can_split *= subgroup_can_split gprint(f" *subgroup{i}.can_split = {subgroup_can_split}") gprint( " => Global available split directions for field {} are {}".format( self.field.name, can_split ) ) if can_split.any(): gprint( " => Enforcing this configuration for Cartesian topology creation." ) for compatible_reqs in splitted_reqs: compatible_reqs.common_can_split = can_split else: gprint(" => No common splitting axes found between all subgroups.") gprint(" 3) BUILDING TOPOLOGIES:") all_topologies = set() for i, compatible_reqs in enumerate(splitted_reqs): gprint(f" *building topology for requirement group {i}") subgroup_topologies = compatible_reqs._build_compatible_topologies() all_topologies.update(subgroup_topologies) gprint( f" Summary of topologies for field {self.field.name}, subgroup {i}:" ) for topo in subgroup_topologies: gprint(f" *{topo.short_description()}") gprint("", level=1) gprint(f" Summary of topologies for field {self.field.name}:") for topo in all_topologies: gprint(f" *{topo.short_description()}") gprint("", level=1) self.built = True
[docs] def all_compatible(self): for topology_descriptor in self.requirements: requirements = self.requirements[topology_descriptor] assert len(requirements) > 0 for req0, req1 in it.combinations(requirements, 2): if not req0.is_compatible_with(req1): return False return True
def _split(self, multi_process): sub_field_requirements = [] for lreq in self.requirements.values(): for req in sorted(lreq, key=lambda x: str(x)): gprint(f" *Requirement {req}") ok = False for i, multi_reqs in enumerate(sub_field_requirements): if req.is_compatible_with(multi_reqs, i): multi_reqs.update(req) ok = True break if not ok: gprint( " => this requirement is not compatible with any existing requirement group, creating a new one (subgroup {}).".format( len(sub_field_requirements) ) ) new_group = MultiFieldRequirements(self.field) new_group.update(req) sub_field_requirements.append(new_group) assert self.nrequirements() == sum( sf.nrequirements() for sf in sub_field_requirements ) for multi_reqs in sub_field_requirements: for topology_descriptor, reqs in multi_reqs.requirements.items(): if isinstance(topology_descriptor, Topology): dim = topology_descriptor.domain_dim else: dim = topology_descriptor.dim can_split = npw.integer_ones(shape=(dim,)) for req in reqs: if isinstance(req.topology_descriptor, Topology): can_split *= req.topology_descriptor.proc_shape > 1 else: can_split *= req.can_split assert (not multi_process) or can_split.any() multi_reqs.common_can_split = can_split return sub_field_requirements def _build_compatible_topologies(self): assert self.all_compatible() all_topologies = set() for topology_descriptor, reqs in self.requirements.items(): if isinstance(topology_descriptor, Topology): gprint(f" -Topology {topology_descriptor.short_description()}") dim = topology_descriptor.domain_dim known_topologies = {topology_descriptor} else: gprint(f" -Topology descriptor {topology_descriptor}") dim = topology_descriptor.dim known_topologies = set() unknown_topologies = set() ghosts = npw.integer_zeros(shape=(dim,)) can_split = npw.integer_ones(shape=(dim,)) for req in reqs: if isinstance(req.topology_descriptor, Topology): req.check_topology() known_topologies.add(req.topology_descriptor) else: ghosts = np.maximum(ghosts, req.min_ghosts) can_split *= req.can_split unknown_topologies.add(req) for req in unknown_topologies: gprint( " >choose or create topology from {} existing topologies:".format( len(known_topologies) ), end="", ) topo = req.topology_descriptor.choose_or_create_topology( known_topologies, ghosts=ghosts, cutdirs=self.common_can_split ) if topo in known_topologies: gprint(f" choosed existing topology {topo.pretty_tag}.") else: gprint(f"\n Created topology {topo.short_description()}") known_topologies.add(topo) req.set_and_check_topology(topo) all_topologies.update(known_topologies) return all_topologies
[docs] class OperatorFieldRequirements: __slots__ = ("_input_field_requirements", "_output_field_requirements") def __init__( self, input_field_requirements=None, output_field_requirements=None, **kwds ): super().__init__(**kwds) check_instance( input_field_requirements, dict, keys=ScalarField, values=MultiFieldRequirements, allow_none=True, ) self._input_field_requirements = first_not_None(input_field_requirements, {}) check_instance( output_field_requirements, dict, keys=ScalarField, values=MultiFieldRequirements, allow_none=True, ) self._output_field_requirements = first_not_None(output_field_requirements, {})
[docs] def get_input_field_requirements(self): return self._input_field_requirements
[docs] def get_output_field_requirements(self): return self._output_field_requirements
input_field_requirements = property(get_input_field_requirements) output_field_requirements = property(get_output_field_requirements)
[docs] def update(self, requirements): check_instance(requirements, OperatorFieldRequirements) self.update_inputs(requirements._input_field_requirements) self.update_outputs(requirements._output_field_requirements)
[docs] def update_inputs(self, input_field_requirements): self._update_requirements( self._input_field_requirements, input_field_requirements )
[docs] def update_outputs(self, output_field_requirements): self._update_requirements( self._output_field_requirements, output_field_requirements )
def _update_requirements(self, self_reqs, reqs): check_instance( reqs, dict, keys=ScalarField, values=(DiscreteFieldRequirements, MultiFieldRequirements, type(None)), ) for field, reqs in reqs.items(): if reqs is not None: reqs = reqs.copy() if not isinstance(reqs, MultiFieldRequirements): _reqs = reqs reqs = MultiFieldRequirements(field) reqs.update(_reqs) if field in self_reqs: self_reqs[field].update(reqs) else: self_reqs[field] = reqs
[docs] def iter_input_requirements(self): """ Iterates over (field, topology_descriptor, field_requirement) for all input requirements. """ for field, freqs in self.input_field_requirements.items(): freqs = freqs.requirements for td, reqs in freqs.items(): for req in reqs: yield field, td, req
[docs] def iter_output_requirements(self): """ Iterates over (field, topology_descriptor, field_requirement) for all output requirements. """ for field, freqs in self.output_field_requirements.items(): freqs = freqs.requirements for td, reqs in freqs.items(): for req in reqs: yield (field, td, req)
[docs] def iter_requirements(self): """ Iterates over (is_input, field, topology_descriptor, field_requirement) for all inputs and outputs. """ it0 = it.zip_longest((True,), self.iter_input_requirements()) it1 = it.zip_longest((False,), self.iter_output_requirements()) return it.chain(it0, it1)
def _get_requirement(self, field, field_requirements): """ Get unique requirement and topology descriptor for given field, if it exists. This is a facility for ComputationalGraphOperators to retrieve their unique i per field requirements. If field is not an hysop.fields.continuous_field.ScalarField, a TypeError is raised. If field is not known, an Attribute error is raised. If multiple topology_descriptors or requirements are present (ie. there is no unicity), this will raise a RuntimeError. """ check_instance(field, ScalarField) if field not in field_requirements: msg = f"No requirements found for field {field.name}." raise AttributeError(msg) freqs = field_requirements[field].requirements if len(freqs.keys()) > 1: msg = f"Multiple topology descriptors are present for field {field.name}." raise RuntimeError(msg) if len(freqs.keys()) == 0: msg = f"No topology descriptors are present for field {field.name}." raise RuntimeError(msg) td = tuple(freqs.keys())[0] reqs = freqs[td] if len(reqs) > 1: msg = f"Multiple requirements are present for field {field.name}." raise RuntimeError(msg) return (td, next(iter(reqs)))
[docs] def get_input_requirement(self, field): return self._get_requirement(field, self._input_field_requirements)
[docs] def get_output_requirement(self, field): return self._get_requirement(field, self._output_field_requirements)
[docs] @debug def build_topologies(self): fields = set(self._input_field_requirements.keys()).union( self._output_field_requirements.keys() ) # enforce deterministic iteration for field in sorted(fields, key=lambda x: f"{x.name}::{x.short_description()}"): reqs = MultiFieldRequirements(field) reqs.update( self._input_field_requirements.get(field, None), self._output_field_requirements.get(field, None), ) reqs.build_topologies()